Martin Durkac
10/18/2021, 7:25 AMMourad Hamou-Mamar
10/18/2021, 8:27 AMparent_path = Path(__file__).resolve().parent
sys.path.append(os.path.relpath(parent_path))
My structure looks like this :
.
├── flow.py
├── functions
│ ├── function_mapping
│ │ ├── function_a.py
│ │ └── function_b.py
│ └── function_transform
│ ├── function_c.py
│ └── function_d.py
└── task
├── mapping
│ └── task_a.py
└── transform
├── task_b.py
└── task_c.py
In my flow.py, I import my task doing :
# in flow.py
from task.mapping.task_a import task_a
And it works for the task
module. But in my task when I try to use my functions, it doesnt find them :
# in task_a.py
from functions.function_mapping.function_a import function_a
I always get the error message : ModuleNotFoundError: No module named 'functions'
I don't get why it wouldnt find my functions
module since I make the import from the root directory and this last one is added to sys.path.
If anyone have any idea about how I could make it work or what should I try to debug this situation, it would be greatly appreciated.
Thanks everyone in advance.Marko Herkaliuk
10/18/2021, 8:43 AMstart_date = Parameter("start_date", default=pendulum.now("Europe/Kiev").add(days=-1).to_date_string())
start_date = Parameter("start_date", default=(date.today() - timedelta(1)).strftime("%Y-%m-%d"))
and usually everything worked as needed, ie during the execution of the script the value of the parameter was calculated.
However, I noticed that on Saturday, Sunday and Monday (today) all similar parameters were passed as 2021-10-14
, ie as if Flow was launched on Friday. Last weekend this was not the case (as in general before)Sergey Shamsuyarov
10/18/2021, 9:05 AMThomas Furmston
10/18/2021, 9:49 AMThomas Furmston
10/18/2021, 9:49 AMimport logging
import pendulum
import prefect
from prefect import Flow, Parameter, task
from prefect.storage import Docker
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import StartFlowRun
logger = logging.getLogger(__name__)
weekday_schedule = CronSchedule(
'41 10 * * 1-5',
start_date=pendulum.now(tz='Europe/London')
)
@task
def calculate_flow_end_date(end_date: str):
if end_date is not None:
return end_date
return prefect.context.get('scheduled_start_time').to_date_string()
common_flow = StartFlowRun(
flow_name='already_existing_flow1',
project_name='my_project_name',
wait=True,
)
baseline_flow = StartFlowRun(
flow_name='already_existing_flow2',
project_name='my_project_name',
wait=True,
)
with Flow("my_scheduled_flow",
schedule=weekday_schedule,
storage=Docker(
base_image='my-docker-image:latest',
local_image=True,
)) as flow:
num_days_parameter = Parameter('num_days', default=1)
num_back_fill_days_parameter = Parameter('num_back_fill_days', default=1)
end_date_parameter = Parameter('end_date', default=None)
task_end_date = calculate_flow_end_date(end_date_parameter)
<http://logger.info|logger.info>('Task End Date: %s', task_end_date)
<http://logger.info|logger.info>('Num Days: %s', num_days_parameter)
<http://logger.info|logger.info>('Num Days Backfill: %s', num_back_fill_days_parameter)
common_flow_result = common_flow(parameters={
'num_days': num_days_parameter,
'num_back_fill_days': num_back_fill_days_parameter,
'end_date': task_end_date,
})
baseline_flow_result = baseline_flow(
upstream_tasks=[common_flow_result],
parameters={
'num_days': num_days_parameter,
'num_back_fill_days': num_back_fill_days_parameter,
'end_date': task_end_date,
}
)
Thomas Furmston
10/18/2021, 9:50 AMThomas Furmston
10/18/2021, 9:50 AMwith Flow(
'common_flow',
storage=Docker(
base_image='my-docker-image:latest',
local_image=True,
)) as flow:
num_days_parameter = Parameter('num_days', required=True)
num_back_fill_days_parameter = Parameter('num_back_fill_days', required=True)
end_date_parameter = Parameter('end_date', required=True)
served_ads_command = ShellTask(
name='my_first_task',
command=construct_etl_command(
app_mode=settings.app_mode,
table_name='task1',
num_days_cli_arg='num_back_fill_days',
num_days=num_back_fill_days_parameter,
end_date=end_date_parameter,
database=settings.database,
),
stream_output=True,
)
Thomas Furmston
10/18/2021, 9:51 AMThomas Furmston
10/18/2021, 9:51 AM[2021-10-18 09:30:15+0000] INFO - prefect.served_advert_task | /tmp/prefect-ufbr5w5b: line 1: Parameter:: No such file or directory [2021-10-18 09:30:15+0000] ERROR - prefect.served_advert_task | Command failed with exit code 1
Thomas Furmston
10/18/2021, 9:52 AMEric Feldman
10/18/2021, 11:23 AMBarbara Abi Khoriati
10/18/2021, 1:14 PMEddie
10/18/2021, 2:21 PMwait_for_flow_run
tasks for fail if the child flow run was a failure? I know that wait_for_flow_run
returns a FlowRunView
so I assume it is possible to define another task that raises an exception if the run view state is_failed()
but I am curious if there is already an interface for this behavior in the built-in tasks.Constantino Schillebeeckx
10/18/2021, 6:08 PMprefect.context.get("scheduled_start_time")
timezone aware? if not, is there an assumed timezone of UTC? If I schedule my flow with something like CronSchedule(cron, start_date=pendulum.datetime(2021, 1, 1, tz=tz))
will the "scheduled_start_time" have the same timezone as the cron schedule?Rowan Gaffney
10/18/2021, 6:21 PMAnatoly Alekseev
10/18/2021, 6:23 PMBen Muller
10/19/2021, 12:44 AMwith Flow(
name="horse_racing_data",
) as flow:
dates = get_dates_task(days_back=days_back, days_ahead=days_ahead, dt_format="%d-%b-%Y")
raw_sectional_data = apply_map(get_puntingform_sectional_dump_task, date=dates)
spell_stats_data = apply_map(
query_db_for_df_task, path_to_sql=unmapped("sql/select_spell_count.sql")
)
enriched_pf_data = apply_map(
calculate_runners_spell_stats_task, pf_sectional_df=raw_sectional_data, spell_data=spell_stats_data
)
I am making multiple separate apply_map
calls and I just wanted to make sure if I can guarantee that when calling calculate_runners_spell_stats_task
I can guarantee the order of the returned maps?
What i mean is that raw_sectional_data
and spell_stats_data
are iterables and as they are provided to the function it is important that they maintain the same order.
Am I all good here?Gabi Pi
10/19/2021, 6:40 AMEric Feldman
10/19/2021, 8:46 AMprefect.core.flow.Flow.serialized_hash
documentation that if the hash of the flow didn’t changed, that it won’t be uploaded to the server when calling register
But I have the same hash all over again and the server keeps having new versions of the flow 🤔Stefano Cascavilla
10/19/2021, 9:24 AMStartFlowRun.run()
this error is shown:
Error during execution of task: ClientError([{'message': '[{\'extensions\': {\'path\': \'$.selectionSet.insert_task_run_artifact.args.objects\', \'code\': \'constraint-violation\'}, \'message\': \'Not-NULL violation. null value in column "tenant_id" violates not-null constraint\'}]', 'locations': [{'line': 2, 'column': 5}], 'path': ['create_task_run_artifact'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': '[{\'extensions\': {\'path\': \'$.selectionSet.insert_task_run_artifact.args.objects\', \'code\': \'constraint-violation\'}, \'message\': \'Not-NULL violation. null value in column "tenant_id" violates not-null constraint\'}]'}}}])
We are migrating from 0.14.22
version, we are using server
as the backend in local
Can anybody help?Lukáš Polák
10/19/2021, 9:35 AMMark Fickett
10/19/2021, 1:39 PMhaf
10/19/2021, 2:56 PMThomas Furmston
10/19/2021, 3:01 PMThomas Furmston
10/19/2021, 3:02 PMKevin
10/19/2021, 8:09 PMAZURE_STORAGE_CONNECTION_STRING
environment variable or save connection string as Prefect secret.')John Jacoby
10/19/2021, 8:33 PMMatt Alhonte
10/19/2021, 9:54 PMrun_config
based on params, and then kicked off from the Outer Flow
2. Spinning up a Dask Cluster from within the Flow and submitting tasks to it (and the Flow itself just runs in a small container) https://docs.prefect.io/core/idioms/resource-manager.html https://docs.prefect.io/api/latest/tasks/resources.html
3. Maybe start experimenting with Adaptive Scaling for Dask Clusters?haf
10/20/2021, 8:06 AMhaf
10/20/2021, 8:06 AMNoah Holm
10/20/2021, 8:08 AMhaf
10/20/2021, 8:08 AMNoah Holm
10/20/2021, 8:09 AMhaf
10/20/2021, 8:10 AMNoah Holm
10/20/2021, 8:12 AMhaf
10/20/2021, 8:12 AMNoah Holm
10/20/2021, 8:13 AMhaf
10/20/2021, 8:15 AMNoah Holm
10/20/2021, 8:17 AMhaf
10/20/2021, 8:17 AMparser = ArgumentParser(add_help=False)
parser.add_argument(
"--debug",
default=False,
required=False,
action="store_true",
dest="debug",
help="debug flag",
)
subparser = parser.add_subparsers(dest="command")
register = subparser.add_parser("register")
run = subparser.add_parser("run")
register.add_argument("-c", "--commit-ref", dest="commit_ref", type=str, required=True)
register.add_argument("-p", "--project-name", dest="project_name", type=str, default="dbt")
register.add_argument("-l", "--labels", action="append", default=[])
register.add_argument("--build", dest="build", action="store_true", default=False)
run.add_argument(
"--run-on-schedule", dest="run_on_schedule", action="store_true", default=False
)
run.add_argument(
"--basepath", dest="basepath", type=str, default=path.dirname(path.realpath(__file__))
)
args = parser.parse_args()
--labels prod
Noah Holm
10/20/2021, 8:18 AMhaf
10/20/2021, 8:19 AMNoah Holm
10/20/2021, 8:20 AMhaf
10/20/2021, 8:34 AMNoah Holm
10/20/2021, 8:35 AMhaf
10/20/2021, 8:35 AMNoah Holm
10/20/2021, 8:36 AMhaf
10/20/2021, 8:38 AMNoah Holm
10/20/2021, 8:48 AMadd_default_labels=False
kwarg at least
https://docs.prefect.io/api/latest/storage.html#localAnna Geller
10/20/2021, 8:59 AMhaf
10/20/2021, 9:27 AMvegan-bear
Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage')
Anna Geller
10/20/2021, 9:36 AMhaf
10/20/2021, 9:37 AMif args.debug:
prefect.config.logging.level = "DEBUG"
if args.command == "run":
prefect.context["basepath"] = args.basepath
print(f"prefect.context.get(basepath)='{prefect.context.get('basepath')}")
flow.run(run_on_schedule=args.run_on_schedule)
elif args.command == "register":
image = f"europe-docker.pkg.dev/logary-delivery/cd/data-pipelines:{args.commit_ref}"
print(f"Registering flow with labels={args.labels} image={image}")
flow.schedule = IntervalSchedule(start_date=at_night(), interval=timedelta(hours=24))
flow.storage = Local(
path="/app/flows/run_mmm.py",
stored_as_script=True,
add_default_labels=False,
)
flow.run_config = KubernetesRun(
image=image,
labels=args.labels,
)
flow.register(
project_name=args.project_name,
build=args.build,
idempotency_key=args.commit_ref,
labels=args.labels,
add_default_labels=False,
)
Anna Geller
10/20/2021, 9:41 AMhaf
10/20/2021, 9:45 AMAnna Geller
10/20/2021, 9:48 AMhaf
10/20/2021, 9:49 AMAnna Geller
10/20/2021, 9:50 AMhaf
10/20/2021, 9:51 AMAnna Geller
10/20/2021, 9:53 AMprefect agent local start
Then, you don’t even need to pass any storage or agent, because Local storage and agent are the defaults:
# hw_flow.py
from prefect import task, Flow
@task(log_stdout=True)
def hello_world():
print("hello world")
with Flow("idempotent-flow") as flow:
hw = hello_world()
Then, you can use the CLI to register your flow to the Prefect Cloud:
prefect register --project YOUR_PROJECT_NAME -p hw_flow.py
prefect auth login --key "YOUR_KEY"
haf
10/20/2021, 9:55 AMAnna Geller
10/20/2021, 9:56 AMhaf
10/20/2021, 9:56 AMAnna Geller
10/20/2021, 9:57 AMhaf
10/20/2021, 9:57 AMAnna Geller
10/20/2021, 9:57 AMhaf
10/20/2021, 9:57 AM$ docker run --rm -it europe-docker.pkg.dev/logary-delivery/cd/data-pipelines:xxx
_____ _____ ______ ______ ______ _____ _______
| __ \| __ \| ____| ____| ____/ ____|__ __|
| |__) | |__) | |__ | |__ | |__ | | | |
| ___/| _ /| __| | __| | __|| | | |
| | | | \ \| |____| | | |___| |____ | |
|_| |_| \_\______|_| |______\_____| |_|
Thanks for using Prefect!!!
This is the official docker image for Prefect Core, intended for executing
Prefect Flows. For more information, please see the docs:
<https://docs.prefect.io/core/getting_started/installation.html#docker>
root@514926ea8f6a:/app# ls
Pipfile Pipfile.lock dbt dbt_project.yml flows infer packages.yml postinstall.py profiles.yml
root@514926ea8f6a:/app# ls flows
__pycache__ exchange_rates.py run_mmm.py
dask-worker-space exchange_rates__insert_rate.sql run_mmm__metrics_eligible_channels.sql
dbt.py exchange_rates__missing_dates.sql run_mmm__revenues_eligible_apps.sql
root@514926ea8f6a:/app# cd flows
root@514926ea8f6a:/app/flows# l
bash: l: command not found
root@514926ea8f6a:/app/flows# pwd
/app/flows
root@514926ea8f6a:/app/flows# exit
logout
Anna Geller
10/20/2021, 9:58 AMhaf
10/20/2021, 9:59 AMAnna Geller
10/20/2021, 9:59 AMhaf
10/20/2021, 9:59 AMFROM prefecthq/prefect:0.15.4-python3.8
RUN pip install --upgrade pip setuptools wheel twine \
&& pip install pipenv \
&& apt-get update \
&& apt-get install -y --no-install-recommends curl gcc python3-dev libssl-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY Pipfile* packages.yml profiles.yml .user.yml .python-version dbt_project.yml postinstall.py ./
COPY infer ./infer
RUN PIPENV_VENV_IN_PROJECT=1 pipenv install --deploy
ENV PATH="/app/.venv/bin:$PATH"
RUN python postinstall.py
COPY flows ./flows
COPY dbt ./dbt
Anna Geller
10/20/2021, 9:59 AMRUN pip install -r requirements.txt
haf
10/20/2021, 9:59 AMpython postinstall.py
Anna Geller
10/20/2021, 10:01 AMhaf
10/20/2021, 10:01 AMI have a pipfile because it makes it more consistentIf this really is my issue then I'm happy to discuss the hows and whys of this, but I don't think it is
Anna Geller
10/20/2021, 10:01 AMCOPY /path/to/your/flow.py .
haf
10/20/2021, 10:06 AMAnna Geller
10/20/2021, 10:06 AMhaf
10/20/2021, 10:06 AMAnna Geller
10/20/2021, 10:08 AMCOPY flows .
haf
10/20/2021, 10:08 AMAnna Geller
10/20/2021, 10:13 AMstored_as_script=True
in Docker storage. I’m not really recommending Docker storage, I think it would be easier with GCS 🙂 but this was your preference, and Docker storage is easiest to get started because you can pass your Dockerfile and it will be built anytime you register your flow so that you can ensure all dependencies are baked into the image
You need to use Docker storage, not local storage - remember the links to the docs I sent you?haf
10/20/2021, 10:20 AMflow.storage = Docker(
path="/app/flows/run_mmm.py",
image_name=image_base,
image_tag=args.commit_ref,
stored_as_script=True,
add_default_labels=False,
)
Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage')
Anna Geller
10/20/2021, 10:38 AMhaf
10/20/2021, 10:43 AMAnna Geller
10/20/2021, 10:45 AMhaf
10/20/2021, 10:45 AMAnna Geller
10/20/2021, 10:46 AMhaf
10/20/2021, 10:46 AMDocker storage is ideal because the image gets rebuilt any time you register your flowNo, it's not ideal, I don't want this
requirements.txt
but doesn't support editable dependencies.prefect
files are being added as part of the build; AFAIK these are the pickled filesAnna Geller
10/20/2021, 10:49 AMRUN pip install .
haf
10/20/2021, 10:50 AMpip install -e .
yesAnna Geller
10/20/2021, 10:50 AMhaf
10/20/2021, 10:50 AMpipenv
does this as part of pipenv install --deploy
pip install .
you're building a .egg
file, but when you do an editable install you're not.flow.register()
What you're missing is that when you doOr in other words, "editable" just means "load files from disk at runtime" while theyou're building apip install .
file, but when you do an editable install you're not..egg
pip install .
means load it from the egg file.Anna Geller
10/20/2021, 10:55 AMhaf
10/20/2021, 10:55 AMAnna Geller
10/20/2021, 10:56 AMhaf
10/20/2021, 10:57 AMDocker image is a packaging mechanism by itselfYes, but not "package" as in "python package"
Anna Geller
10/20/2021, 10:59 AMhaf
10/20/2021, 10:59 AMThe result will be the same: a package is installed in the env so that it can be used by Prefect flows, right?But yes, with the Dockerfile I posted the python packages can be referenced by Prefect flows; and this is what is needed
Anna Geller
10/20/2021, 11:02 AMhaf
10/20/2021, 11:03 AMAnna Geller
10/20/2021, 11:03 AMhaf
10/20/2021, 11:04 AMAnna Geller
10/20/2021, 11:04 AMhaf
10/20/2021, 11:05 AMAnna Geller
10/21/2021, 8:43 AMhaf
10/21/2021, 8:44 AMtini
entrypoint which threw away all ENVs
• this meant pip
ran with the system pip, not the venv pip
• you're right that it would have been better to install with requirements.txt — but only as-so-far that pipenv doesn't tie into pyproject.toml
which seems to be "the way" nowadays after PIP https://www.python.org/dev/peps/pep-0518/docker = Docker(
path="/app/flows/run_mmm.py",
registry_url="ex/cd",
dockerfile="Dockerfile",
image_name="data-pipelines",
image_tag=args.commit_ref,
ignore_healthchecks=True,
stored_as_script=True,
)
docker.add_flow(flow)
flow.storage = docker
add_flow
it just crashes with the message I showed you before.Anna Geller
10/21/2021, 8:50 AMhaf
10/21/2021, 8:59 AMAnna Geller
10/21/2021, 9:34 AMhaf
10/21/2021, 10:40 AM